package com.trytech.mongoocrawler.server.common.queue;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.trytech.mongoocrawler.server.CrawlerSession;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Created by hp on 2017-1-25.
 */
public class DisruptorContext {
    //ringbuffer的长度
    private final static int RINGBUFFER_SIZE = 128;
    //线程池的容量
    private final static int POOL_SIZE = 20;
    private static ExecutorService executor;
    //DisruptorContext实例
    private DisruptorContext instance;
    private CrawlerSession crawlerSession;
    private Disruptor<UrlResultFetcherEvent> urlDisruptor;
    private Disruptor<WebResultFetcherEvent> webResultDisruptor;
    //5个WebResult的消费者
    private WebResultFetcherWorkHandler[] webResultFetcherWorkHandlerArray = new WebResultFetcherWorkHandler[1];
    //
    private RingBuffer<WebResultFetcherEvent> webResultRingBuffer;
    //5个UrlResult的消费者
    private UrlResultFetcherWorkHandler[] urlResultFetcherWorkHandlerArray = new UrlResultFetcherWorkHandler[1];
    //
    private RingBuffer<UrlResultFetcherEvent> urlResultRingBuffer;
    private RUN_STATE run_state = RUN_STATE.STOPPED;
    private DisruptorContext(CrawlerSession crawlerSession){
        this.crawlerSession = crawlerSession;
        executor = Executors.newFixedThreadPool(POOL_SIZE);
        urlDisruptor = new Disruptor<UrlResultFetcherEvent>(new UrlFetcherEventFactory(), RINGBUFFER_SIZE, executor, ProducerType.MULTI, new BlockingWaitStrategy());
        for(int i=0;i<urlResultFetcherWorkHandlerArray.length;i++){
            urlResultFetcherWorkHandlerArray[i] = new UrlResultFetcherWorkHandler(this);
        }
        urlDisruptor.handleEventsWithWorkerPool(urlResultFetcherWorkHandlerArray[0]);
        urlDisruptor.handleExceptionsWith(new UrlResultFetcherExceptionHandler());
        urlResultRingBuffer = urlDisruptor.start();

        webResultDisruptor = new Disruptor<WebResultFetcherEvent>(new WebFetcherEventFactory(), RINGBUFFER_SIZE, executor, ProducerType.MULTI, new BlockingWaitStrategy());
        for(int i=0;i<webResultFetcherWorkHandlerArray.length;i++){
            webResultFetcherWorkHandlerArray[i] = new WebResultFetcherWorkHandler(this);
        }
        webResultDisruptor.handleEventsWithWorkerPool(webResultFetcherWorkHandlerArray[0]);
        webResultDisruptor.handleExceptionsWith(new WebResultFetcherExceptionHandler());
        webResultRingBuffer = webResultDisruptor.start();
    }

    public static DisruptorContext getInstance(CrawlerSession crawlerSession) {
        return new DisruptorContext(crawlerSession);
    }

    public void start(){
        if(urlResultRingBuffer == null){
            urlResultRingBuffer = urlDisruptor.start();
        }
        if(webResultRingBuffer == null){
            webResultRingBuffer = webResultDisruptor.start();
        }
        run_state = RUN_STATE.RUNNING;
    }

    public CrawlerSession getCrawlerSession(){
        return this.crawlerSession;
    }

    public RingBuffer<WebResultFetcherEvent> getWebResultRingBuffer(){
        return webResultRingBuffer;
    }

    public RingBuffer<UrlResultFetcherEvent> getUrlResultRingBuffer(){
        return urlResultRingBuffer;
    }

    public boolean isStarted(){
        return run_state.equals(RUN_STATE.RUNNING);
    }

    public void close(){
        if(urlDisruptor != null){
            urlDisruptor.shutdown();
        }
        if(webResultDisruptor != null){
            webResultDisruptor.shutdown();
        }
        if(executor != null){
            executor.shutdown();
        }
        run_state = RUN_STATE.STOPPED;
    }

    //运行状态枚举类
    private enum RUN_STATE {
        STOPPED(1), RUNNING(2);
        private int value;

        RUN_STATE(int value) {
            this.value = value;
        }

        public int getValue() {
            return value;
        }

        public void setValue(int value) {
            this.value = value;
        }

        public boolean equals(RUN_STATE runState) {
            return runState.getValue() == getValue();
        }
    }
}
